home *** CD-ROM | disk | FTP | other *** search
/ Celestin Apprentice 5 / Apprentice-Release5.iso / Source Code / C / Applications / Python 1.3.3 / Python 133 68K / Demo / threads / condition.py < prev    next >
Text File  |  1996-05-20  |  17KB  |  476 lines

  1. # Defines classes that provide synchronization objects.  Note that use of
  2. # this module requires that your Python support threads.
  3. #
  4. #    condition()   # a POSIX-like condition-variable object
  5. #    barrier(n)    # an n-thread barrier
  6. #    event()       # an event object
  7. #    semaphore(n=1)# a semaphore object, with initial count n
  8. #
  9. # CONDITIONS
  10. #
  11. # A condition object is created via
  12. #   import this_module
  13. #   your_condition_object = this_module.condition()
  14. #
  15. # Methods:
  16. #   .acquire()
  17. #      acquire the lock associated with the condition
  18. #   .release()
  19. #      release the lock associated with the condition
  20. #   .wait()
  21. #      block the thread until such time as some other thread does a
  22. #      .signal or .broadcast on the same condition, and release the
  23. #      lock associated with the condition.  The lock associated with
  24. #      the condition MUST be in the acquired state at the time
  25. #      .wait is invoked.
  26. #   .signal()
  27. #      wake up exactly one thread (if any) that previously did a .wait
  28. #      on the condition; that thread will awaken with the lock associated
  29. #      with the condition in the acquired state.  If no threads are
  30. #      .wait'ing, this is a nop.  If more than one thread is .wait'ing on
  31. #      the condition, any of them may be awakened.
  32. #   .broadcast()
  33. #      wake up all threads (if any) that are .wait'ing on the condition;
  34. #      the threads are woken up serially, each with the lock in the
  35. #      acquired state, so should .release() as soon as possible.  If no
  36. #      threads are .wait'ing, this is a nop.
  37. #
  38. #      Note that if a thread does a .wait *while* a signal/broadcast is
  39. #      in progress, it's guaranteeed to block until a subsequent
  40. #      signal/broadcast.
  41. #
  42. #      Secret feature:  `broadcast' actually takes an integer argument,
  43. #      and will wake up exactly that many waiting threads (or the total
  44. #      number waiting, if that's less).  Use of this is dubious, though,
  45. #      and probably won't be supported if this form of condition is
  46. #      reimplemented in C.
  47. #
  48. # DIFFERENCES FROM POSIX
  49. #
  50. # + A separate mutex is not needed to guard condition data.  Instead, a
  51. #   condition object can (must) be .acquire'ed and .release'ed directly.
  52. #   This eliminates a common error in using POSIX conditions.
  53. #
  54. # + Because of implementation difficulties, a POSIX `signal' wakes up
  55. #   _at least_ one .wait'ing thread.  Race conditions make it difficult
  56. #   to stop that.  This implementation guarantees to wake up only one,
  57. #   but you probably shouldn't rely on that.
  58. #
  59. # PROTOCOL
  60. #
  61. # Condition objects are used to block threads until "some condition" is
  62. # true.  E.g., a thread may wish to wait until a producer pumps out data
  63. # for it to consume, or a server may wish to wait until someone requests
  64. # its services, or perhaps a whole bunch of threads want to wait until a
  65. # preceding pass over the data is complete.  Early models for conditions
  66. # relied on some other thread figuring out when a blocked thread's
  67. # condition was true, and made the other thread responsible both for
  68. # waking up the blocked thread and guaranteeing that it woke up with all
  69. # data in a correct state.  This proved to be very delicate in practice,
  70. # and gave conditions a bad name in some circles.
  71. #
  72. # The POSIX model addresses these problems by making a thread responsible
  73. # for ensuring that its own state is correct when it wakes, and relies
  74. # on a rigid protocol to make this easy; so long as you stick to the
  75. # protocol, POSIX conditions are easy to "get right":
  76. #
  77. #  A) The thread that's waiting for some arbitrarily-complex condition
  78. #     (ACC) to become true does:
  79. #
  80. #     condition.acquire()
  81. #     while not (code to evaluate the ACC):
  82. #           condition.wait()
  83. #           # That blocks the thread, *and* releases the lock.  When a
  84. #           # condition.signal() happens, it will wake up some thread that
  85. #           # did a .wait, *and* acquire the lock again before .wait
  86. #           # returns.
  87. #           #
  88. #           # Because the lock is acquired at this point, the state used
  89. #           # in evaluating the ACC is frozen, so it's safe to go back &
  90. #           # reevaluate the ACC.
  91. #
  92. #     # At this point, ACC is true, and the thread has the condition
  93. #     # locked.
  94. #     # So code here can safely muck with the shared state that
  95. #     # went into evaluating the ACC -- if it wants to.
  96. #     # When done mucking with the shared state, do
  97. #     condition.release()
  98. #
  99. #  B) Threads that are mucking with shared state that may affect the
  100. #     ACC do:
  101. #
  102. #     condition.acquire()
  103. #     # muck with shared state
  104. #     condition.release()
  105. #     if it's possible that ACC is true now:
  106. #         condition.signal() # or .broadcast()
  107. #
  108. #     Note:  You may prefer to put the "if" clause before the release().
  109. #     That's fine, but do note that anyone waiting on the signal will
  110. #     stay blocked until the release() is done (since acquiring the
  111. #     condition is part of what .wait() does before it returns).
  112. #
  113. # TRICK OF THE TRADE
  114. #
  115. # With simpler forms of conditions, it can be impossible to know when
  116. # a thread that's supposed to do a .wait has actually done it.  But
  117. # because this form of condition releases a lock as _part_ of doing a
  118. # wait, the state of that lock can be used to guarantee it.
  119. #
  120. # E.g., suppose thread A spawns thread B and later wants to wait for B to
  121. # complete:
  122. #
  123. # In A:                             In B:
  124. #
  125. # B_done = condition()              ... do work ...
  126. # B_done.acquire()                  B_done.acquire(); B_done.release()
  127. # spawn B                           B_done.signal()
  128. # ... some time later ...           ... and B exits ...
  129. # B_done.wait()
  130. #
  131. # Because B_done was in the acquire'd state at the time B was spawned,
  132. # B's attempt to acquire B_done can't succeed until A has done its
  133. # B_done.wait() (which releases B_done).  So B's B_done.signal() is
  134. # guaranteed to be seen by the .wait().  Without the lock trick, B
  135. # may signal before A .waits, and then A would wait forever.
  136. #
  137. # BARRIERS
  138. #
  139. # A barrier object is created via
  140. #   import this_module
  141. #   your_barrier = this_module.barrier(num_threads)
  142. #
  143. # Methods:
  144. #   .enter()
  145. #      the thread blocks until num_threads threads in all have done
  146. #      .enter().  Then the num_threads threads that .enter'ed resume,
  147. #      and the barrier resets to capture the next num_threads threads
  148. #      that .enter it.
  149. #
  150. # EVENTS
  151. #
  152. # An event object is created via
  153. #   import this_module
  154. #   your_event = this_module.event()
  155. #
  156. # An event has two states, `posted' and `cleared'.  An event is
  157. # created in the cleared state.
  158. #
  159. # Methods:
  160. #
  161. #   .post()
  162. #      Put the event in the posted state, and resume all threads
  163. #      .wait'ing on the event (if any).
  164. #
  165. #   .clear()
  166. #      Put the event in the cleared state.
  167. #
  168. #   .is_posted()
  169. #      Returns 0 if the event is in the cleared state, or 1 if the event
  170. #      is in the posted state.
  171. #
  172. #   .wait()
  173. #      If the event is in the posted state, returns immediately.
  174. #      If the event is in the cleared state, blocks the calling thread
  175. #      until the event is .post'ed by another thread.
  176. #
  177. # Note that an event, once posted, remains posted until explicitly
  178. # cleared.  Relative to conditions, this is both the strength & weakness
  179. # of events.  It's a strength because the .post'ing thread doesn't have to
  180. # worry about whether the threads it's trying to communicate with have
  181. # already done a .wait (a condition .signal is seen only by threads that
  182. # do a .wait _prior_ to the .signal; a .signal does not persist).  But
  183. # it's a weakness because .clear'ing an event is error-prone:  it's easy
  184. # to mistakenly .clear an event before all the threads you intended to
  185. # see the event get around to .wait'ing on it.  But so long as you don't
  186. # need to .clear an event, events are easy to use safely.
  187. #
  188. # SEMAPHORES
  189. #
  190. # A semaphore object is created via
  191. #   import this_module
  192. #   your_semaphore = this_module.semaphore(count=1)
  193. #
  194. # A semaphore has an integer count associated with it.  The initial value
  195. # of the count is specified by the optional argument (which defaults to
  196. # 1) passed to the semaphore constructor.
  197. #
  198. # Methods:
  199. #
  200. #   .p()
  201. #      If the semaphore's count is greater than 0, decrements the count
  202. #      by 1 and returns.
  203. #      Else if the semaphore's count is 0, blocks the calling thread
  204. #      until a subsequent .v() increases the count.  When that happens,
  205. #      the count will be decremented by 1 and the calling thread resumed.
  206. #
  207. #   .v()
  208. #      Increments the semaphore's count by 1, and wakes up a thread (if
  209. #      any) blocked by a .p().  It's an (detected) error for a .v() to
  210. #      increase the semaphore's count to a value larger than the initial
  211. #      count.
  212.  
  213. import thread
  214.  
  215. class condition:
  216.     def __init__(self):
  217.         # the lock actually used by .acquire() and .release()
  218.         self.mutex = thread.allocate_lock()
  219.  
  220.         # lock used to block threads until a signal
  221.         self.checkout = thread.allocate_lock()
  222.         self.checkout.acquire()
  223.  
  224.         # internal critical-section lock, & the data it protects
  225.         self.idlock = thread.allocate_lock()
  226.         self.id = 0
  227.         self.waiting = 0  # num waiters subject to current release
  228.         self.pending = 0  # num waiters awaiting next signal
  229.         self.torelease = 0      # num waiters to release
  230.         self.releasing = 0      # 1 iff release is in progress
  231.  
  232.     def acquire(self):
  233.         self.mutex.acquire()
  234.  
  235.     def release(self):
  236.         self.mutex.release()
  237.  
  238.     def wait(self):
  239.         mutex, checkout, idlock = self.mutex, self.checkout, self.idlock
  240.         if not mutex.locked():
  241.             raise ValueError, \
  242.                   "condition must be .acquire'd when .wait() invoked"
  243.  
  244.         idlock.acquire()
  245.         myid = self.id
  246.         self.pending = self.pending + 1
  247.         idlock.release()
  248.  
  249.         mutex.release()
  250.  
  251.         while 1:
  252.             checkout.acquire(); idlock.acquire()
  253.             if myid < self.id:
  254.                 break
  255.             checkout.release(); idlock.release()
  256.  
  257.         self.waiting = self.waiting - 1
  258.         self.torelease = self.torelease - 1
  259.         if self.torelease:
  260.             checkout.release()
  261.         else:
  262.             self.releasing = 0
  263.             if self.waiting == self.pending == 0:
  264.                 self.id = 0
  265.         idlock.release()
  266.         mutex.acquire()
  267.  
  268.     def signal(self):
  269.         self.broadcast(1)
  270.  
  271.     def broadcast(self, num = -1):
  272.         if num < -1:
  273.             raise ValueError, '.broadcast called with num ' + `num`
  274.         if num == 0:
  275.             return
  276.         self.idlock.acquire()
  277.         if self.pending:
  278.             self.waiting = self.waiting + self.pending
  279.             self.pending = 0
  280.             self.id = self.id + 1
  281.         if num == -1:
  282.             self.torelease = self.waiting
  283.         else:
  284.             self.torelease = min( self.waiting,
  285.                                   self.torelease + num )
  286.         if self.torelease and not self.releasing:
  287.             self.releasing = 1
  288.             self.checkout.release()
  289.         self.idlock.release()
  290.  
  291. class barrier:
  292.     def __init__(self, n):
  293.         self.n = n
  294.         self.togo = n
  295.         self.full = condition()
  296.  
  297.     def enter(self):
  298.         full = self.full
  299.         full.acquire()
  300.         self.togo = self.togo - 1
  301.         if self.togo:
  302.             full.wait()
  303.         else:
  304.             self.togo = self.n
  305.             full.broadcast()
  306.         full.release()
  307.  
  308. class event:
  309.     def __init__(self):
  310.         self.state  = 0
  311.         self.posted = condition()
  312.  
  313.     def post(self):
  314.         self.posted.acquire()
  315.         self.state = 1
  316.         self.posted.broadcast()
  317.         self.posted.release()
  318.  
  319.     def clear(self):
  320.         self.posted.acquire()
  321.         self.state = 0
  322.         self.posted.release()
  323.  
  324.     def is_posted(self):
  325.         self.posted.acquire()
  326.         answer = self.state
  327.         self.posted.release()
  328.         return answer
  329.  
  330.     def wait(self):
  331.         self.posted.acquire()
  332.         if not self.state:
  333.             self.posted.wait()
  334.         self.posted.release()
  335.  
  336. class semaphore:
  337.     def __init__(self, count=1):
  338.         if count <= 0:
  339.             raise ValueError, 'semaphore count %d; must be >= 1' % count
  340.         self.count = count
  341.         self.maxcount = count
  342.         self.nonzero = condition()
  343.  
  344.     def p(self):
  345.         self.nonzero.acquire()
  346.         while self.count == 0:
  347.             self.nonzero.wait()
  348.         self.count = self.count - 1
  349.         self.nonzero.release()
  350.  
  351.     def v(self):
  352.         self.nonzero.acquire()
  353.         if self.count == self.maxcount:
  354.             raise ValueError, '.v() tried to raise semaphore count above ' \
  355.                   'initial value ' + `maxcount`
  356.         self.count = self.count + 1
  357.         self.nonzero.signal()
  358.         self.nonzero.release()
  359.  
  360. # The rest of the file is a test case, that runs a number of parallelized
  361. # quicksorts in parallel.  If it works, you'll get about 600 lines of
  362. # tracing output, with a line like
  363. #     test passed! 209 threads created in all
  364. # as the last line.  The content and order of preceding lines will
  365. # vary across runs.
  366.  
  367. def _new_thread(func, *args):
  368.     global TID
  369.     tid.acquire(); id = TID = TID+1; tid.release()
  370.     io.acquire(); alive.append(id); \
  371.                   print 'starting thread', id, '--', len(alive), 'alive'; \
  372.                   io.release()
  373.     thread.start_new_thread( func, (id,) + args )
  374.  
  375. def _qsort(tid, a, l, r, finished):
  376.     # sort a[l:r]; post finished when done
  377.     io.acquire(); print 'thread', tid, 'qsort', l, r; io.release()
  378.     if r-l > 1:
  379.         pivot = a[l]
  380.         j = l+1   # make a[l:j] <= pivot, and a[j:r] > pivot
  381.         for i in range(j, r):
  382.             if a[i] <= pivot:
  383.                 a[j], a[i] = a[i], a[j]
  384.                 j = j + 1
  385.         a[l], a[j-1] = a[j-1], pivot
  386.  
  387.         l_subarray_sorted = event()
  388.         r_subarray_sorted = event()
  389.         _new_thread(_qsort, a, l, j-1, l_subarray_sorted)
  390.         _new_thread(_qsort, a, j, r,   r_subarray_sorted)
  391.         l_subarray_sorted.wait()
  392.         r_subarray_sorted.wait()
  393.  
  394.     io.acquire(); print 'thread', tid, 'qsort done'; \
  395.                   alive.remove(tid); io.release()
  396.     finished.post()
  397.  
  398. def _randarray(tid, a, finished):
  399.     io.acquire(); print 'thread', tid, 'randomizing array'; \
  400.                   io.release()
  401.     for i in range(1, len(a)):
  402.         wh.acquire(); j = randint(0,i); wh.release()
  403.         a[i], a[j] = a[j], a[i]
  404.     io.acquire(); print 'thread', tid, 'randomizing done'; \
  405.                   alive.remove(tid); io.release()
  406.     finished.post()
  407.  
  408. def _check_sort(a):
  409.     if a != range(len(a)):
  410.         raise ValueError, ('a not sorted', a)
  411.  
  412. def _run_one_sort(tid, a, bar, done):
  413.     # randomize a, and quicksort it
  414.     # for variety, all the threads running this enter a barrier
  415.     # at the end, and post `done' after the barrier exits
  416.     io.acquire(); print 'thread', tid, 'randomizing', a; \
  417.                   io.release()
  418.     finished = event()
  419.     _new_thread(_randarray, a, finished)
  420.     finished.wait()
  421.  
  422.     io.acquire(); print 'thread', tid, 'sorting', a; io.release()
  423.     finished.clear()
  424.     _new_thread(_qsort, a, 0, len(a), finished)
  425.     finished.wait()
  426.     _check_sort(a)
  427.  
  428.     io.acquire(); print 'thread', tid, 'entering barrier'; \
  429.                   io.release()
  430.     bar.enter()
  431.     io.acquire(); print 'thread', tid, 'leaving barrier'; \
  432.                   io.release()
  433.     io.acquire(); alive.remove(tid); io.release()
  434.     bar.enter() # make sure they've all removed themselves from alive
  435.                 ##  before 'done' is posted
  436.     bar.enter() # just to be cruel
  437.     done.post()
  438.  
  439. def test():
  440.     global TID, tid, io, wh, randint, alive
  441.     import whrandom
  442.     randint = whrandom.randint
  443.  
  444.     TID = 0                             # thread ID (1, 2, ...)
  445.     tid = thread.allocate_lock()        # for changing TID
  446.     io  = thread.allocate_lock()        # for printing, and 'alive'
  447.     wh  = thread.allocate_lock()        # for calls to whrandom
  448.     alive = []                          # IDs of active threads
  449.  
  450.     NSORTS = 5
  451.     arrays = []
  452.     for i in range(NSORTS):
  453.         arrays.append( range( (i+1)*10 ) )
  454.  
  455.     bar = barrier(NSORTS)
  456.     finished = event()
  457.     for i in range(NSORTS):
  458.         _new_thread(_run_one_sort, arrays[i], bar, finished)
  459.     finished.wait()
  460.  
  461.     print 'all threads done, and checking results ...'
  462.     if alive:
  463.         raise ValueError, ('threads still alive at end', alive)
  464.     for i in range(NSORTS):
  465.         a = arrays[i]
  466.         if len(a) != (i+1)*10:
  467.             raise ValueError, ('length of array', i, 'screwed up')
  468.         _check_sort(a)
  469.  
  470.     print 'test passed!', TID, 'threads created in all'
  471.  
  472. if __name__ == '__main__':
  473.     test()
  474.  
  475. # end of module
  476.